package org.budo.warehouse.logic.consumer.async;

import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.budo.dubbo.protocol.async.repository.AbstractAsyncRepository;
import org.budo.dubbo.protocol.async.repository.BudoAsyncInvocation;
import org.budo.graph.annotation.SpringGraph;
import org.budo.support.lang.util.StringUtil;
import org.budo.support.servlet.util.QueryStringUtil;
import org.budo.warehouse.logic.api.AbstractDataConsumerWrapper;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.bean.LogicDynamicBeanProvider;
import org.budo.warehouse.logic.util.DataMessageLogicUtil;
import org.budo.warehouse.logic.util.PipelineUtil;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.aop.framework.AopContext;

import com.alibaba.fastjson.JSON;

import lombok.Getter;
import lombok.Setter;

/**
 * @author limingwei
 */
@Getter
@Setter
public class AsyncDataConsumerWrapper extends AbstractDataConsumerWrapper {
    @Resource
    private IDataNodeService dataNodeService;

    @Resource
    private LogicDynamicBeanProvider logicDynamicBeanProvider;

    @SpringGraph
    @Override
    public void consume(DataMessage dataMessage) {
        Pipeline pipeline = this.getPipeline();
        DataNode targetDataNode = dataNodeService.findByIdCached(pipeline.getTargetDataNodeId());

        String url = targetDataNode.getUrl();
        if (!StringUtil.startsWith(url, "async:")) { // 不是异步
            this.getDataConsumer().consume(dataMessage);
            return;
        }

        AsyncDataConsumerWrapper _this = (AsyncDataConsumerWrapper) AopContext.currentProxy();

        String _format = QueryStringUtil.getParameter(url, "_format", "");
        if ("v1".equals(_format)) {
            _this.send_message_v1(dataMessage);
            return;
        }

        if ("qc".equals(_format)) {
            _this.send_message_qc(dataMessage);
            return;
        }

        // 新版,默认版
        DataMessage dataMessagePojo = DataMessageLogicUtil.toMessagePojo(dataMessage, pipeline);
        this.getDataConsumer().consume(dataMessagePojo);
    }

    @SpringGraph
    public void send_message_qc(DataMessage dataMessage) {
        Pipeline pipeline = this.getPipeline();

        AbstractAsyncRepository abstractAsyncRepository = (AbstractAsyncRepository) logicDynamicBeanProvider.asyncRepository(pipeline);
        String destinationName = PipelineUtil.destinationName(pipeline);

        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        for (DataEntry dataEntry : dataEntries) {
            Integer rowCount = dataEntry.getRowCount();
            for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
                Map<String, Object> qc_row = AsyncMessageUtil.to_qc_row(dataEntry, rowIndex);
                String json = JSON.toJSONString(qc_row);
                abstractAsyncRepository.sendRawMessage(destinationName, json);
            }
        }
    }

    @SpringGraph
    public void send_message_v1(DataMessage dataMessage) {
        Pipeline pipeline = this.getPipeline();

        AbstractAsyncRepository abstractAsyncRepository = (AbstractAsyncRepository) logicDynamicBeanProvider.asyncRepository(pipeline);
        String destinationName = PipelineUtil.destinationName(pipeline);

        BudoAsyncInvocation budoAsyncInvocation = AsyncMessageUtil.invocation_v1(dataMessage);

        abstractAsyncRepository.sendMessage(destinationName, budoAsyncInvocation);
    }
}